{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TFX pipeline example - Chicago Taxi tips prediction\n",
    "\n",
    "## Overview\n",
    "[Tensorflow Extended (TFX)](https://github.com/tensorflow/tfx) is a Google-production-scale machine\n",
    "learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines\n",
    "consisting of TFX components, which brings the user large-scale ML task orchestration, artifact lineage, as well as the power of various [TFX libraries](https://www.tensorflow.org/resources/libraries-extensions). Kubeflow Pipelines can be used as the orchestrator supporting the \n",
    "execution of a TFX pipeline.\n",
    "\n",
    "This sample demonstrates how to author a ML pipeline in TFX and run it on a KFP deployment. \n",
    "\n",
    "## Permission\n",
    "\n",
    "This pipeline requires Google Cloud Storage permission to run. \n",
    "If KFP was deployed through K8S marketplace, please make sure **\"Allow access to the following Cloud APIs\"** is checked when creating the cluster. <img src=\"check_permission.png\">\n",
    "Otherwise, follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials) to guarantee at least, that the service account has `storage.admin` role."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!python3 -m pip install pip --upgrade --quiet --user\n",
    "!python3 -m pip install kfp --upgrade --quiet --user\n",
    "pip install tfx==1.4.0 tensorflow==2.5.1 --quiet --user"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Note: if you're warned by \n",
    "```\n",
    "WARNING: The script {LIBRARY_NAME} is installed in '/home/jupyter/.local/bin' which is not on PATH.\n",
    "```\n",
    "You might need to fix by running the next cell and restart the kernel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# Set `PATH` to include user python binary directory and a directory containing `skaffold`.\n",
    "PATH=%env PATH\n",
    "%env PATH={PATH}:/home/jupyter/.local/bin"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example we'll need TFX SDK later than 0.21 to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
    "\n",
    "## RuntimeParameter in TFX DSL\n",
    "Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import os\n",
    "\n",
    "import kfp\n",
    "import tensorflow_model_analysis as tfma\n",
    "from tfx import v1 as tfx"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.\n",
    "# Assigning workflow ID as part of pipeline name allows the user to bypass\n",
    "# some schema checks which are redundant for experimental pipelines.\n",
    "pipeline_name = 'taxi_pipeline_with_parameters'\n",
    "\n",
    "# Path of pipeline data root, should be a GCS path.\n",
    "# Note that when running on KFP, the pipeline root is always a runtime parameter.\n",
    "# The value specified here will be its default.\n",
    "pipeline_root = os.path.join('gs://{{kfp-default-bucket}}', 'tfx_taxi_simple',\n",
    "                              kfp.dsl.RUN_ID_PLACEHOLDER)\n",
    "\n",
    "# Location of input data, should be a GCS path under which there is a csv file.\n",
    "data_root = '/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/data/simple'\n",
    "\n",
    "# Path to the module file, GCS path.\n",
    "# Module file is one of the recommended way to provide customized logic for component\n",
    "# includeing Trainer and Transformer.\n",
    "# See https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/components/trainer/component.py#L38\n",
    "taxi_module_file_param = tfx.dsl.experimental.RuntimeParameter(\n",
    "    name='module-file',\n",
    "    default='/opt/conda/lib/python3.7/site-packages/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py',\n",
    "    ptype=str,\n",
    ")\n",
    "# Path that ML models are pushed, should be a GCS path.\n",
    "# TODO: CHANGE the GCS bucket name to yours.\n",
    "serving_model_dir = os.path.join('gs://your-bucket', 'serving_model', 'tfx_taxi_simple')\n",
    "push_destination = tfx.dsl.experimental.RuntimeParameter(\n",
    "     name='push_destination',\n",
    "     default=json.dumps({'filesystem': {'base_directory': serving_model_dir}}),\n",
    "     ptype=str,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## TFX Components\n",
    "\n",
    "Please refer to the [official guide](https://www.tensorflow.org/tfx/guide#tfx_pipeline_components) for the detailed explanation and purpose of each TFX component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "example_gen = tfx.components.CsvExampleGen(input_base=data_root)\n",
    "\n",
    "statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])\n",
    "\n",
    "schema_gen = tfx.components.SchemaGen(\n",
    "    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)\n",
    "\n",
    "example_validator = tfx.components.ExampleValidator(\n",
    "  statistics=statistics_gen.outputs['statistics'],\n",
    "  schema=schema_gen.outputs['schema'])\n",
    "\n",
    "# The module file used in Transform and Trainer component is paramterized by\n",
    "# _taxi_module_file_param.\n",
    "transform = tfx.components.Transform(\n",
    "  examples=example_gen.outputs['examples'],\n",
    "  schema=schema_gen.outputs['schema'],\n",
    "  module_file=taxi_module_file_param)\n",
    "\n",
    "# The numbers of steps in train_args are specified as RuntimeParameter with\n",
    "# name 'train-steps' and 'eval-steps', respectively.\n",
    "trainer = tfx.components.Trainer(\n",
    "  module_file=taxi_module_file_param,\n",
    "  examples=transform.outputs['transformed_examples'],\n",
    "  schema=schema_gen.outputs['schema'],\n",
    "  transform_graph=transform.outputs['transform_graph'],\n",
    "  train_args=tfx.proto.TrainArgs(num_steps=10),\n",
    "  eval_args=tfx.proto.EvalArgs(num_steps=5))\n",
    "\n",
    "# Set the TFMA config for Model Evaluation and Validation.\n",
    "eval_config = tfma.EvalConfig(\n",
    "    model_specs=[\n",
    "      tfma.ModelSpec(\n",
    "          signature_name='serving_default', label_key='tips_xf',\n",
    "          preprocessing_function_names=['transform_features'])\n",
    "    ],\n",
    "    metrics_specs=[\n",
    "      tfma.MetricsSpec(\n",
    "          # The metrics added here are in addition to those saved with the\n",
    "          # model (assuming either a keras model or EvalSavedModel is used).\n",
    "          # Any metrics added into the saved model (for example using\n",
    "          # model.compile(..., metrics=[...]), etc) will be computed\n",
    "          # automatically.\n",
    "          metrics=[\n",
    "              tfma.MetricConfig(class_name='ExampleCount')\n",
    "          ],\n",
    "          # To add validation thresholds for metrics saved with the model,\n",
    "          # add them keyed by metric name to the thresholds map.\n",
    "          thresholds = {\n",
    "              'binary_accuracy': tfma.MetricThreshold(\n",
    "                  value_threshold=tfma.GenericValueThreshold(\n",
    "                      lower_bound={'value': 0.5}),\n",
    "                  change_threshold=tfma.GenericChangeThreshold(\n",
    "                     direction=tfma.MetricDirection.HIGHER_IS_BETTER,\n",
    "                     absolute={'value': -1e-10}))\n",
    "          }\n",
    "      )\n",
    "    ],\n",
    "    slicing_specs=[\n",
    "      # An empty slice spec means the overall slice, i.e. the whole dataset.\n",
    "      tfma.SlicingSpec(),\n",
    "      # Data can be sliced along a feature column. In this case, data is\n",
    "      # sliced along feature column trip_start_hour.\n",
    "      tfma.SlicingSpec(feature_keys=['trip_start_hour'])\n",
    "    ])\n",
    "\n",
    "# The name of slicing column is specified as a RuntimeParameter.\n",
    "evaluator = tfx.components.Evaluator(\n",
    "  examples=example_gen.outputs['examples'],\n",
    "  model=trainer.outputs['model'],\n",
    "  eval_config=eval_config)\n",
    "\n",
    "pusher = tfx.components.Pusher(\n",
    "  model=trainer.outputs['model'],\n",
    "  model_blessing=evaluator.outputs['blessing'],\n",
    "  push_destination=push_destination)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the DSL pipeline object.\n",
    "# This pipeline obj carries the business logic of the pipeline, but no runner-specific information\n",
    "# was included.\n",
    "dsl_pipeline = tfx.dsl.Pipeline(\n",
    "  pipeline_name=pipeline_name,\n",
    "  pipeline_root=pipeline_root,\n",
    "  components=[\n",
    "      example_gen, statistics_gen, schema_gen, example_validator, transform,\n",
    "      trainer, evaluator, pusher\n",
    "  ],\n",
    "  enable_cache=True,\n",
    "  beam_pipeline_args=['--direct_num_workers=%d' % 0],\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Specify a TFX docker image. For the full list of tags please see:\n",
    "# https://hub.docker.com/r/tensorflow/tfx/tags\n",
    "tfx_image = 'gcr.io/tfx-oss-public/tfx:1.4.0'\n",
    "config = tfx.orchestration.experimental.KubeflowDagRunnerConfig(\n",
    "      kubeflow_metadata_config=tfx.orchestration.experimental\n",
    "      .get_default_kubeflow_metadata_config(),\n",
    "      tfx_image=tfx_image)\n",
    "kfp_runner = tfx.orchestration.experimental.KubeflowDagRunner(config=config)\n",
    "# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n",
    "# By default it is named <pipeline_name>.tar.gz\n",
    "kfp_runner.run(dsl_pipeline)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "run_result = kfp.Client(\n",
    "    host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com'  # Put your KFP endpoint here\n",
    ").create_run_from_pipeline_package(\n",
    "    pipeline_name + '.tar.gz', \n",
    "    arguments={\n",
    "        # Uncomment following lines in order to use custom GCS bucket/module file/training data.\n",
    "        # 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
    "        # 'module-file': '<gcs path to the module file>',  # delete this line to use default module file.\n",
    "        # 'data-root': '<gcs path to the data>'  # delete this line to use default data.\n",
    "})"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf2-gpu.2-4.m69",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-4:m69"
  },
  "kernelspec": {
   "display_name": "Python [conda env:root] *",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "metadata": {
     "collapsed": false
    },
    "source": []
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
